/*
 * Copyright 2009-2015 MongoDB, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

/*
 * This file contains C implementations of some of the functions
 * needed by the message module. If possible, these implementations
 * should be used to speed up message creation.
 */

#include "Python.h"

#include "_cbsonmodule.h"
#include "buffer.h"

struct module_state {
    PyObject* _cbson;
};

/* See comments about module initialization in _cbsonmodule.c */
#if PY_MAJOR_VERSION >= 3
#define GETSTATE(m) ((struct module_state*)PyModule_GetState(m))
#else
#define GETSTATE(m) (&_state)
static struct module_state _state;
#endif

#if PY_MAJOR_VERSION >= 3
#define BYTES_FORMAT_STRING "y#"
#else
#define BYTES_FORMAT_STRING "s#"
#endif

#define DOC_TOO_LARGE_FMT "BSON document too large (%d bytes)" \
                          " - the connected server supports" \
                          " BSON document sizes up to %ld bytes."

/* Get an error class from the pymongo.errors module.
 *
 * Returns a new ref */
static PyObject* _error(char* name) {
    PyObject* error;
    PyObject* errors = PyImport_ImportModule("pymongo.errors");
    if (!errors) {
        return NULL;
    }
    error = PyObject_GetAttrString(errors, name);
    Py_DECREF(errors);
    return error;
}

/* add a lastError message on the end of the buffer.
 * returns 0 on failure */
static int add_last_error(PyObject* self, buffer_t buffer,
                          int request_id, char* ns, int nslen,
                          codec_options_t* options, PyObject* args) {
    struct module_state *state = GETSTATE(self);

    int message_start;
    int document_start;
    int message_length;
    int document_length;
    PyObject* key;
    PyObject* value;
    Py_ssize_t pos = 0;
    PyObject* one;
    char *p = strchr(ns, '.');
    /* Length of the database portion of ns. */
    nslen = p ? (int)(p - ns) : nslen;

    message_start = buffer_save_space(buffer, 4);
    if (message_start == -1) {
        PyErr_NoMemory();
        return 0;
    }
    if (!buffer_write_int32(buffer, (int32_t)request_id) ||
        !buffer_write_bytes(buffer,
                            "\x00\x00\x00\x00"  /* responseTo */
                            "\xd4\x07\x00\x00"  /* opcode */
                            "\x00\x00\x00\x00", /* options */
                            12) ||
        !buffer_write_bytes(buffer,
                            ns, nslen) ||       /* database */
        !buffer_write_bytes(buffer,
                            ".$cmd\x00"         /* collection name */
                            "\x00\x00\x00\x00"  /* skip */
                            "\xFF\xFF\xFF\xFF", /* limit (-1) */
                            14)) {
        return 0;
    }

    /* save space for length */
    document_start = buffer_save_space(buffer, 4);
    if (document_start == -1) {
        PyErr_NoMemory();
        return 0;
    }

    /* getlasterror: 1 */
    if (!(one = PyLong_FromLong(1)))
        return 0;

    if (!write_pair(state->_cbson, buffer, "getlasterror", 12, one, 0,
                    options, 1)) {
        Py_DECREF(one);
        return 0;
    }
    Py_DECREF(one);

    /* getlasterror options */
    while (PyDict_Next(args, &pos, &key, &value)) {
        if (!decode_and_write_pair(state->_cbson, buffer, key, value, 0,
                                   options, 0)) {
            return 0;
        }
    }

    /* EOD */
    if (!buffer_write_bytes(buffer, "\x00", 1)) {
        return 0;
    }

    message_length = buffer_get_position(buffer) - message_start;
    document_length = buffer_get_position(buffer) - document_start;
    buffer_write_int32_at_position(
        buffer, message_start, (int32_t)message_length);
    buffer_write_int32_at_position(
        buffer, document_start, (int32_t)document_length);
    return 1;
}

static int init_insert_buffer(buffer_t buffer, int request_id, int options,
                              const char* coll_name, int coll_name_len) {
    /* Save space for message length */
    int length_location = buffer_save_space(buffer, 4);
    if (length_location == -1) {
        PyErr_NoMemory();
        return length_location;
    }
    if (!buffer_write_int32(buffer, (int32_t)request_id) ||
        !buffer_write_bytes(buffer,
                            "\x00\x00\x00\x00"
                            "\xd2\x07\x00\x00",
                            8) ||
        !buffer_write_int32(buffer, (int32_t)options) ||
        !buffer_write_bytes(buffer,
                            coll_name,
                            coll_name_len + 1)) {
        return -1;
    }
    return length_location;
}

static PyObject* _cbson_insert_message(PyObject* self, PyObject* args) {
    /* Used by the Bulk API to insert into pre-2.6 servers. Collection.insert
     * uses _cbson_do_batched_insert. */
    struct module_state *state = GETSTATE(self);

    /* NOTE just using a random number as the request_id */
    int request_id = rand();
    char* collection_name = NULL;
    int collection_name_length;
    PyObject* docs;
    PyObject* doc;
    PyObject* iterator;
    int before, cur_size, max_size = 0;
    int flags = 0;
    unsigned char check_keys;
    unsigned char safe;
    unsigned char continue_on_error;
    codec_options_t options;
    PyObject* last_error_args;
    buffer_t buffer;
    int length_location, message_length;
    PyObject* result;

    if (!PyArg_ParseTuple(args, "et#ObbObO&",
                          "utf-8",
                          &collection_name,
                          &collection_name_length,
                          &docs, &check_keys, &safe,
                          &last_error_args,
                          &continue_on_error,
                          convert_codec_options, &options)) {
        return NULL;
    }
    if (continue_on_error) {
        flags += 1;
    }
    buffer = buffer_new();
    if (!buffer) {
        PyErr_NoMemory();
        destroy_codec_options(&options);
        PyMem_Free(collection_name);
        return NULL;
    }

    length_location = init_insert_buffer(buffer,
                                         request_id,
                                         flags,
                                         collection_name,
                                         collection_name_length);
    if (length_location == -1) {
        destroy_codec_options(&options);
        PyMem_Free(collection_name);
        buffer_free(buffer);
        return NULL;
    }

    iterator = PyObject_GetIter(docs);
    if (iterator == NULL) {
        PyObject* InvalidOperation = _error("InvalidOperation");
        if (InvalidOperation) {
            PyErr_SetString(InvalidOperation, "input is not iterable");
            Py_DECREF(InvalidOperation);
        }
        destroy_codec_options(&options);
        buffer_free(buffer);
        PyMem_Free(collection_name);
        return NULL;
    }
    while ((doc = PyIter_Next(iterator)) != NULL) {
        before = buffer_get_position(buffer);
        if (!write_dict(state->_cbson, buffer, doc, check_keys,
                        &options, 1)) {
            Py_DECREF(doc);
            Py_DECREF(iterator);
            destroy_codec_options(&options);
            buffer_free(buffer);
            PyMem_Free(collection_name);
            return NULL;
        }
        Py_DECREF(doc);
        cur_size = buffer_get_position(buffer) - before;
        max_size = (cur_size > max_size) ? cur_size : max_size;
    }
    Py_DECREF(iterator);

    if (PyErr_Occurred()) {
        destroy_codec_options(&options);
        buffer_free(buffer);
        PyMem_Free(collection_name);
        return NULL;
    }

    if (!max_size) {
        PyObject* InvalidOperation = _error("InvalidOperation");
        if (InvalidOperation) {
            PyErr_SetString(InvalidOperation, "cannot do an empty bulk insert");
            Py_DECREF(InvalidOperation);
        }
        destroy_codec_options(&options);
        buffer_free(buffer);
        PyMem_Free(collection_name);
        return NULL;
    }

    message_length = buffer_get_position(buffer) - length_location;
    buffer_write_int32_at_position(
        buffer, length_location, (int32_t)message_length);

    if (safe) {
        if (!add_last_error(self, buffer, request_id, collection_name,
                            collection_name_length, &options, last_error_args)) {
            destroy_codec_options(&options);
            buffer_free(buffer);
            PyMem_Free(collection_name);
            return NULL;
        }
    }

    PyMem_Free(collection_name);

    /* objectify buffer */
    result = Py_BuildValue("i" BYTES_FORMAT_STRING "i", request_id,
                           buffer_get_buffer(buffer),
                           buffer_get_position(buffer),
                           max_size);
    destroy_codec_options(&options);
    buffer_free(buffer);
    return result;
}

static PyObject* _cbson_update_message(PyObject* self, PyObject* args) {
    /* NOTE just using a random number as the request_id */
    struct module_state *state = GETSTATE(self);

    int request_id = rand();
    char* collection_name = NULL;
    int collection_name_length;
    int before, cur_size, max_size = 0;
    PyObject* doc;
    PyObject* spec;
    unsigned char multi;
    unsigned char upsert;
    unsigned char safe;
    unsigned char check_keys;
    codec_options_t options;
    PyObject* last_error_args;
    int flags;
    buffer_t buffer;
    int length_location, message_length;
    PyObject* result;

    if (!PyArg_ParseTuple(args, "et#bbOObObO&",
                          "utf-8",
                          &collection_name,
                          &collection_name_length,
                          &upsert, &multi, &spec, &doc, &safe,
                          &last_error_args, &check_keys,
                          convert_codec_options, &options)) {
        return NULL;
    }

    flags = 0;
    if (upsert) {
        flags += 1;
    }
    if (multi) {
        flags += 2;
    }
    buffer = buffer_new();
    if (!buffer) {
        destroy_codec_options(&options);
        PyErr_NoMemory();
        PyMem_Free(collection_name);
        return NULL;
    }

    // save space for message length
    length_location = buffer_save_space(buffer, 4);
    if (length_location == -1) {
        destroy_codec_options(&options);
        PyMem_Free(collection_name);
        PyErr_NoMemory();
        return NULL;
    }
    if (!buffer_write_int32(buffer, (int32_t)request_id) ||
        !buffer_write_bytes(buffer,
                            "\x00\x00\x00\x00"
                            "\xd1\x07\x00\x00"
                            "\x00\x00\x00\x00",
                            12) ||
        !buffer_write_bytes(buffer,
                            collection_name,
                            collection_name_length + 1) ||
        !buffer_write_int32(buffer, (int32_t)flags)) {
        destroy_codec_options(&options);
        buffer_free(buffer);
        PyMem_Free(collection_name);
        return NULL;
    }

    before = buffer_get_position(buffer);
    if (!write_dict(state->_cbson, buffer, spec, 0, &options, 1)) {
        destroy_codec_options(&options);
        buffer_free(buffer);
        PyMem_Free(collection_name);
        return NULL;
    }
    max_size = buffer_get_position(buffer) - before;

    before = buffer_get_position(buffer);
    if (!write_dict(state->_cbson, buffer, doc, check_keys,
                    &options, 1)) {
        destroy_codec_options(&options);
        buffer_free(buffer);
        PyMem_Free(collection_name);
        return NULL;
    }
    cur_size = buffer_get_position(buffer) - before;
    max_size = (cur_size > max_size) ? cur_size : max_size;

    message_length = buffer_get_position(buffer) - length_location;
    buffer_write_int32_at_position(
        buffer, length_location, (int32_t)message_length);

    if (safe) {
        if (!add_last_error(self, buffer, request_id, collection_name,
                            collection_name_length, &options, last_error_args)) {
            destroy_codec_options(&options);
            buffer_free(buffer);
            PyMem_Free(collection_name);
            return NULL;
        }
    }

    PyMem_Free(collection_name);

    /* objectify buffer */
    result = Py_BuildValue("i" BYTES_FORMAT_STRING "i", request_id,
                           buffer_get_buffer(buffer),
                           buffer_get_position(buffer),
                           max_size);
    destroy_codec_options(&options);
    buffer_free(buffer);
    return result;
}

static PyObject* _cbson_query_message(PyObject* self, PyObject* args) {
    /* NOTE just using a random number as the request_id */
    struct module_state *state = GETSTATE(self);

    int request_id = rand();
    unsigned int flags;
    char* collection_name = NULL;
    int collection_name_length;
    int begin, cur_size, max_size = 0;
    int num_to_skip;
    int num_to_return;
    PyObject* query;
    PyObject* field_selector;
    codec_options_t options;
    buffer_t buffer;
    int length_location, message_length;
    unsigned char check_keys = 0;
    PyObject* result;

    if (!PyArg_ParseTuple(args, "Iet#iiOOO&|b",
                          &flags,
                          "utf-8",
                          &collection_name,
                          &collection_name_length,
                          &num_to_skip, &num_to_return,
                          &query, &field_selector,
                          convert_codec_options, &options,
                          &check_keys)) {
        return NULL;
    }
    buffer = buffer_new();
    if (!buffer) {
        PyErr_NoMemory();
        destroy_codec_options(&options);
        PyMem_Free(collection_name);
        return NULL;
    }

    // save space for message length
    length_location = buffer_save_space(buffer, 4);
    if (length_location == -1) {
        destroy_codec_options(&options);
        PyMem_Free(collection_name);
        PyErr_NoMemory();
        return NULL;
    }
    if (!buffer_write_int32(buffer, (int32_t)request_id) ||
        !buffer_write_bytes(buffer, "\x00\x00\x00\x00\xd4\x07\x00\x00", 8) ||
        !buffer_write_int32(buffer, (int32_t)flags) ||
        !buffer_write_bytes(buffer, collection_name,
                            collection_name_length + 1) ||
        !buffer_write_int32(buffer, (int32_t)num_to_skip) ||
        !buffer_write_int32(buffer, (int32_t)num_to_return)) {
        destroy_codec_options(&options);
        buffer_free(buffer);
        PyMem_Free(collection_name);
        return NULL;
    }

    begin = buffer_get_position(buffer);
    if (!write_dict(state->_cbson, buffer, query, check_keys, &options, 1)) {
        destroy_codec_options(&options);
        buffer_free(buffer);
        PyMem_Free(collection_name);
        return NULL;
    }
    max_size = buffer_get_position(buffer) - begin;

    if (field_selector != Py_None) {
        begin = buffer_get_position(buffer);
        if (!write_dict(state->_cbson, buffer, field_selector, 0,
                        &options, 1)) {
            destroy_codec_options(&options);
            buffer_free(buffer);
            PyMem_Free(collection_name);
            return NULL;
        }
        cur_size = buffer_get_position(buffer) - begin;
        max_size = (cur_size > max_size) ? cur_size : max_size;
    }

    PyMem_Free(collection_name);

    message_length = buffer_get_position(buffer) - length_location;
    buffer_write_int32_at_position(
        buffer, length_location, (int32_t)message_length);

    /* objectify buffer */
    result = Py_BuildValue("i" BYTES_FORMAT_STRING "i", request_id,
                           buffer_get_buffer(buffer),
                           buffer_get_position(buffer),
                           max_size);
    destroy_codec_options(&options);
    buffer_free(buffer);
    return result;
}

static PyObject* _cbson_get_more_message(PyObject* self, PyObject* args) {
    /* NOTE just using a random number as the request_id */
    int request_id = rand();
    char* collection_name = NULL;
    int collection_name_length;
    int num_to_return;
    long long cursor_id;
    buffer_t buffer;
    int length_location, message_length;
    PyObject* result;

    if (!PyArg_ParseTuple(args, "et#iL",
                          "utf-8",
                          &collection_name,
                          &collection_name_length,
                          &num_to_return,
                          &cursor_id)) {
        return NULL;
    }
    buffer = buffer_new();
    if (!buffer) {
        PyErr_NoMemory();
        PyMem_Free(collection_name);
        return NULL;
    }

    // save space for message length
    length_location = buffer_save_space(buffer, 4);
    if (length_location == -1) {
        PyMem_Free(collection_name);
        PyErr_NoMemory();
        return NULL;
    }
    if (!buffer_write_int32(buffer, (int32_t)request_id) ||
        !buffer_write_bytes(buffer,
                            "\x00\x00\x00\x00"
                            "\xd5\x07\x00\x00"
                            "\x00\x00\x00\x00", 12) ||
        !buffer_write_bytes(buffer,
                            collection_name,
                            collection_name_length + 1) ||
        !buffer_write_int32(buffer, (int32_t)num_to_return) ||
        !buffer_write_int64(buffer, (int64_t)cursor_id)) {
        buffer_free(buffer);
        PyMem_Free(collection_name);
        return NULL;
    }

    PyMem_Free(collection_name);

    message_length = buffer_get_position(buffer) - length_location;
    buffer_write_int32_at_position(
        buffer, length_location, (int32_t)message_length);

    /* objectify buffer */
    result = Py_BuildValue("i" BYTES_FORMAT_STRING, request_id,
                           buffer_get_buffer(buffer),
                           buffer_get_position(buffer));
    buffer_free(buffer);
    return result;
}

static void
_set_document_too_large(int size, long max) {
    PyObject* DocumentTooLarge = _error("DocumentTooLarge");
    if (DocumentTooLarge) {
#if PY_MAJOR_VERSION >= 3
        PyObject* error = PyUnicode_FromFormat(DOC_TOO_LARGE_FMT, size, max);
#else
        PyObject* error = PyString_FromFormat(DOC_TOO_LARGE_FMT, size, max);
#endif
        if (error) {
            PyErr_SetObject(DocumentTooLarge, error);
            Py_DECREF(error);
        }
        Py_DECREF(DocumentTooLarge);
    }
}

static PyObject*
_send_insert(PyObject* self, PyObject* ctx,
             PyObject* gle_args, buffer_t buffer,
             char* coll_name, int coll_len, int request_id, int safe,
             codec_options_t* options, PyObject* to_publish) {

    if (safe) {
        if (!add_last_error(self, buffer, request_id,
                            coll_name, coll_len, options, gle_args)) {
            return NULL;
        }
    }

    /* The max_doc_size parameter for legacy_write is the max size of any
     * document in buffer. We enforced max size already, pass 0 here. */
    return PyObject_CallMethod(ctx, "legacy_write",
                               "i" BYTES_FORMAT_STRING "iNO",
                               request_id,
                               buffer_get_buffer(buffer),
                               buffer_get_position(buffer),
                               0,
                               PyBool_FromLong((long)safe),
                               to_publish);
}

static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) {
    struct module_state *state = GETSTATE(self);

    /* NOTE just using a random number as the request_id */
    int request_id = rand();
    int send_safe, flags = 0;
    int length_location, message_length;
    int collection_name_length;
    char* collection_name = NULL;
    PyObject* docs;
    PyObject* doc;
    PyObject* iterator;
    PyObject* ctx;
    PyObject* last_error_args;
    PyObject* result;
    PyObject* max_bson_size_obj;
    PyObject* max_message_size_obj;
    PyObject* to_publish = NULL;
    unsigned char check_keys;
    unsigned char safe;
    unsigned char continue_on_error;
    codec_options_t options;
    unsigned char empty = 1;
    long max_bson_size;
    long max_message_size;
    buffer_t buffer;
    PyObject *exc_type = NULL, *exc_value = NULL, *exc_trace = NULL;

    if (!PyArg_ParseTuple(args, "et#ObbObO&O",
                          "utf-8",
                          &collection_name,
                          &collection_name_length,
                          &docs, &check_keys, &safe,
                          &last_error_args,
                          &continue_on_error,
                          convert_codec_options, &options,
                          &ctx)) {
        return NULL;
    }
    if (continue_on_error) {
        flags += 1;
    }
    /*
     * If we are doing unacknowledged writes *and* continue_on_error
     * is True it's pointless (and slower) to send GLE.
     */
    send_safe = (safe || !continue_on_error);
    max_bson_size_obj = PyObject_GetAttrString(ctx, "max_bson_size");
#if PY_MAJOR_VERSION >= 3
    max_bson_size = PyLong_AsLong(max_bson_size_obj);
#else
    max_bson_size = PyInt_AsLong(max_bson_size_obj);
#endif
    Py_XDECREF(max_bson_size_obj);
    if (max_bson_size == -1) {
        destroy_codec_options(&options);
        PyMem_Free(collection_name);
        return NULL;
    }

    max_message_size_obj = PyObject_GetAttrString(ctx, "max_message_size");
#if PY_MAJOR_VERSION >= 3
    max_message_size = PyLong_AsLong(max_message_size_obj);
#else
    max_message_size = PyInt_AsLong(max_message_size_obj);
#endif
    Py_XDECREF(max_message_size_obj);
    if (max_message_size == -1) {
        destroy_codec_options(&options);
        PyMem_Free(collection_name);
        return NULL;
    }

    buffer = buffer_new();
    if (!buffer) {
        destroy_codec_options(&options);
        PyErr_NoMemory();
        PyMem_Free(collection_name);
        return NULL;
    }

    length_location = init_insert_buffer(buffer,
                                         request_id,
                                         flags,
                                         collection_name,
                                         collection_name_length);
    if (length_location == -1) {
        goto insertfail;
    }

    if (!(to_publish = PyList_New(0))) {
        goto insertfail;
    }

    iterator = PyObject_GetIter(docs);
    if (iterator == NULL) {
        PyObject* InvalidOperation = _error("InvalidOperation");
        if (InvalidOperation) {
            PyErr_SetString(InvalidOperation, "input is not iterable");
            Py_DECREF(InvalidOperation);
        }
        goto insertfail;
    }
    while ((doc = PyIter_Next(iterator)) != NULL) {
        int before = buffer_get_position(buffer);
        int cur_size;
        if (!write_dict(state->_cbson, buffer, doc, check_keys,
                        &options, 1)) {
            goto iterfail;
        }

        cur_size = buffer_get_position(buffer) - before;
        if (cur_size > max_bson_size) {
            /* If we've encoded anything send it before raising. */
            if (!empty) {
                buffer_update_position(buffer, before);
                message_length = buffer_get_position(buffer) - length_location;
                buffer_write_int32_at_position(
                    buffer, length_location, (int32_t)message_length);
                result = _send_insert(self, ctx, last_error_args, buffer,
                                      collection_name, collection_name_length,
                                      request_id, send_safe, &options,
                                      to_publish);
                if (!result)
                    goto iterfail;
                Py_DECREF(result);
            }
            _set_document_too_large(cur_size, max_bson_size);
            goto iterfail;
        }
        empty = 0;

        /* We have enough data, send this batch. */
        if (buffer_get_position(buffer) > max_message_size) {
            int new_request_id = rand();
            int message_start;
            buffer_t new_buffer = buffer_new();
            if (!new_buffer) {
                PyErr_NoMemory();
                goto iterfail;
            }
            message_start = init_insert_buffer(new_buffer,
                                               new_request_id,
                                               flags,
                                               collection_name,
                                               collection_name_length);
            if (message_start == -1) {
                buffer_free(new_buffer);
                goto iterfail;
            }

            /* Copy the overflow encoded document into the new buffer. */
            if (!buffer_write_bytes(new_buffer,
                (const char*)buffer_get_buffer(buffer) + before, cur_size)) {
                buffer_free(new_buffer);
                goto iterfail;
            }

            /* Roll back to the beginning of this document. */
            buffer_update_position(buffer, before);
            message_length = buffer_get_position(buffer) - length_location;
            buffer_write_int32_at_position(
                buffer, length_location, (int32_t)message_length);

            result = _send_insert(self, ctx, last_error_args, buffer,
                                  collection_name, collection_name_length,
                                  request_id, send_safe, &options, to_publish);

            buffer_free(buffer);
            buffer = new_buffer;
            request_id = new_request_id;
            length_location = message_start;

            Py_DECREF(to_publish);
            if (!(to_publish = PyList_New(0))) {
                goto insertfail;
            }

            if (!result) {
                PyObject *etype = NULL, *evalue = NULL, *etrace = NULL;
                PyObject* OperationFailure;
                PyErr_Fetch(&etype, &evalue, &etrace);
                OperationFailure = _error("OperationFailure");
                if (OperationFailure) {
                    if (PyErr_GivenExceptionMatches(etype, OperationFailure)) {
                        if (!safe || continue_on_error) {
                            Py_DECREF(OperationFailure);
                            if (!safe) {
                                /* We're doing unacknowledged writes and
                                 * continue_on_error is False. Just return. */
                                Py_DECREF(etype);
                                Py_XDECREF(evalue);
                                Py_XDECREF(etrace);
                                Py_DECREF(to_publish);
                                Py_DECREF(iterator);
                                Py_DECREF(doc);
                                buffer_free(buffer);
                                PyMem_Free(collection_name);
                                Py_RETURN_NONE;
                            }
                            /* continue_on_error is True, store the error
                             * details to re-raise after the final batch */
                            Py_XDECREF(exc_type);
                            Py_XDECREF(exc_value);
                            Py_XDECREF(exc_trace);
                            exc_type = etype;
                            exc_value = evalue;
                            exc_trace = etrace;
                            if (PyList_Append(to_publish, doc) < 0) {
                                goto iterfail;
                            }
                            Py_CLEAR(doc);
                            continue;
                        }
                    }
                    Py_DECREF(OperationFailure);
                }
                /* This isn't OperationFailure, we couldn't
                 * import OperationFailure, or we are doing
                 * acknowledged writes. Re-raise immediately. */
                PyErr_Restore(etype, evalue, etrace);
                goto iterfail;
            } else {
                Py_DECREF(result);
            }
        }
        if (PyList_Append(to_publish, doc) < 0) {
            goto iterfail;
        }
        Py_CLEAR(doc);
    }
    Py_DECREF(iterator);

    if (PyErr_Occurred()) {
        goto insertfail;
    }

    if (empty) {
        PyObject* InvalidOperation = _error("InvalidOperation");
        if (InvalidOperation) {
            PyErr_SetString(InvalidOperation, "cannot do an empty bulk insert");
            Py_DECREF(InvalidOperation);
        }
        goto insertfail;
    }

    message_length = buffer_get_position(buffer) - length_location;
    buffer_write_int32_at_position(
        buffer, length_location, (int32_t)message_length);

    /* Send the last (or only) batch */
    result = _send_insert(self, ctx, last_error_args, buffer,
                          collection_name, collection_name_length,
                          request_id, safe, &options, to_publish);

    Py_DECREF(to_publish);
    PyMem_Free(collection_name);
    buffer_free(buffer);

    if (!result) {
        Py_XDECREF(exc_type);
        Py_XDECREF(exc_value);
        Py_XDECREF(exc_trace);
        return NULL;
    } else {
        Py_DECREF(result);
    }

    if (exc_type) {
        /* Re-raise any previously stored exception
         * due to continue_on_error being True */
        PyErr_Restore(exc_type, exc_value, exc_trace);
        return NULL;
    }

    Py_RETURN_NONE;

iterfail:
    Py_XDECREF(doc);
    Py_DECREF(iterator);
insertfail:
    Py_XDECREF(exc_type);
    Py_XDECREF(exc_value);
    Py_XDECREF(exc_trace);
    Py_XDECREF(to_publish);
    buffer_free(buffer);
    PyMem_Free(collection_name);
    return NULL;
}

static PyObject*
_send_write_command(PyObject* ctx, buffer_t buffer, int lst_len_loc,
                    int cmd_len_loc, unsigned char* errors,
                    PyObject* to_publish) {

    PyObject* result;

    int request_id = rand();
    int position = buffer_get_position(buffer);
    int length = position - lst_len_loc - 1;
    buffer_write_int32_at_position(buffer, lst_len_loc, (int32_t)length);
    length = position - cmd_len_loc;
    buffer_write_int32_at_position(buffer, cmd_len_loc, (int32_t)length);
    buffer_write_int32_at_position(buffer, 0, (int32_t)position);
    buffer_write_int32_at_position(buffer, 4, (int32_t)request_id);

    /* Send the current batch */
    result = PyObject_CallMethod(ctx, "write_command",
                                 "i" BYTES_FORMAT_STRING "O",
                                 request_id,
                                 buffer_get_buffer(buffer),
                                 buffer_get_position(buffer),
                                 to_publish);
    if (result && PyDict_GetItemString(result, "writeErrors"))
        *errors = 1;
    return result;
}

static buffer_t
_command_buffer_new(char* ns, int ns_len) {
    buffer_t buffer;
    if (!(buffer = buffer_new())) {
        PyErr_NoMemory();
        return NULL;
    }
    /* Save space for message length and request id */
    if ((buffer_save_space(buffer, 8)) == -1) {
        PyErr_NoMemory();
        buffer_free(buffer);
        return NULL;
    }
    if (!buffer_write_bytes(buffer,
                            "\x00\x00\x00\x00"  /* responseTo */
                            "\xd4\x07\x00\x00"  /* opcode */
                            "\x00\x00\x00\x00", /* flags */
                            12) ||
        !buffer_write_bytes(buffer,
                            ns, ns_len + 1) ||  /* namespace */
        !buffer_write_bytes(buffer,
                            "\x00\x00\x00\x00"  /* skip */
                            "\xFF\xFF\xFF\xFF", /* limit (-1) */
                            8)) {
        buffer_free(buffer);
        return NULL;
    }
    return buffer;
}

#define _INSERT 0
#define _UPDATE 1
#define _DELETE 2

static PyObject*
_cbson_do_batched_write_command(PyObject* self, PyObject* args) {
    struct module_state *state = GETSTATE(self);

    long max_bson_size;
    long max_cmd_size;
    long max_write_batch_size;
    long idx_offset = 0;
    int idx = 0;
    int cmd_len_loc;
    int lst_len_loc;
    int ns_len;
    int ordered;
    char *ns = NULL;
    PyObject* max_bson_size_obj;
    PyObject* max_write_batch_size_obj;
    PyObject* command;
    PyObject* doc;
    PyObject* docs;
    PyObject* ctx;
    PyObject* iterator;
    PyObject* result;
    PyObject* results;
    PyObject* to_publish = NULL;
    unsigned char op;
    unsigned char check_keys;
    codec_options_t options;
    unsigned char empty = 1;
    unsigned char errors = 0;
    buffer_t buffer;

    if (!PyArg_ParseTuple(args, "et#bOObO&O", "utf-8",
                          &ns, &ns_len, &op, &command, &docs, &check_keys,
                          convert_codec_options, &options,
                          &ctx)) {
        return NULL;
    }

    max_bson_size_obj = PyObject_GetAttrString(ctx, "max_bson_size");
#if PY_MAJOR_VERSION >= 3
    max_bson_size = PyLong_AsLong(max_bson_size_obj);
#else
    max_bson_size = PyInt_AsLong(max_bson_size_obj);
#endif
    Py_XDECREF(max_bson_size_obj);
    if (max_bson_size == -1) {
        destroy_codec_options(&options);
        PyMem_Free(ns);
        return NULL;
    }
    /*
     * Max BSON object size + 16k - 2 bytes for ending NUL bytes
     * XXX: This should come from the server - SERVER-10643
     */
    max_cmd_size = max_bson_size + 16382;

    max_write_batch_size_obj = PyObject_GetAttrString(ctx, "max_write_batch_size");
#if PY_MAJOR_VERSION >= 3
    max_write_batch_size = PyLong_AsLong(max_write_batch_size_obj);
#else
    max_write_batch_size = PyInt_AsLong(max_write_batch_size_obj);
#endif
    Py_XDECREF(max_write_batch_size_obj);
    if (max_write_batch_size == -1) {
        destroy_codec_options(&options);
        PyMem_Free(ns);
        return NULL;
    }

    /* Default to True */
    ordered = !((PyDict_GetItemString(command, "ordered")) == Py_False);

    if (!(results = PyList_New(0))) {
        destroy_codec_options(&options);
        PyMem_Free(ns);
        return NULL;
    }

    if (!(to_publish = PyList_New(0))) {
        destroy_codec_options(&options);
        PyMem_Free(ns);
        Py_DECREF(results);
        return NULL;
    }

    if (!(buffer = _command_buffer_new(ns, ns_len))) {
        destroy_codec_options(&options);
        PyMem_Free(ns);
        Py_DECREF(results);
        Py_DECREF(to_publish);
        return NULL;
    }

    PyMem_Free(ns);

    /* Position of command document length */
    cmd_len_loc = buffer_get_position(buffer);
    if (!write_dict(state->_cbson, buffer, command, 0,
                    &options, 0)) {
        goto cmdfail;
    }

    /* Write type byte for array */
    *(buffer_get_buffer(buffer) + (buffer_get_position(buffer) - 1)) = 0x4;

    switch (op) {
    case _INSERT:
        {
            if (!buffer_write_bytes(buffer, "documents\x00", 10))
                goto cmdfail;
            break;
        }
    case _UPDATE:
        {
            /* MongoDB does key validation for update. */
            check_keys = 0;
            if (!buffer_write_bytes(buffer, "updates\x00", 8))
                goto cmdfail;
            break;
        }
    case _DELETE:
        {
            /* Never check keys in a delete command. */
            check_keys = 0;
            if (!buffer_write_bytes(buffer, "deletes\x00", 8))
                goto cmdfail;
            break;
        }
    default:
        {
            PyObject* InvalidOperation = _error("InvalidOperation");
            if (InvalidOperation) {
                PyErr_SetString(InvalidOperation, "Unknown command");
                Py_DECREF(InvalidOperation);
            }
            goto cmdfail;
        }
    }

    /* Save space for list document */
    lst_len_loc = buffer_save_space(buffer, 4);
    if (lst_len_loc == -1) {
        PyErr_NoMemory();
        goto cmdfail;
    }

    iterator = PyObject_GetIter(docs);
    if (iterator == NULL) {
        PyObject* InvalidOperation = _error("InvalidOperation");
        if (InvalidOperation) {
            PyErr_SetString(InvalidOperation, "input is not iterable");
            Py_DECREF(InvalidOperation);
        }
        goto cmdfail;
    }
    while ((doc = PyIter_Next(iterator)) != NULL) {
        int sub_doc_begin = buffer_get_position(buffer);
        int cur_doc_begin;
        int cur_size;
        int enough_data = 0;
        int enough_documents = 0;
        char key[16];
        empty = 0;
        INT2STRING(key, idx);
        if (!buffer_write_bytes(buffer, "\x03", 1) ||
            !buffer_write_bytes(buffer, key, (int)strlen(key) + 1)) {
            goto cmditerfail;
        }
        cur_doc_begin = buffer_get_position(buffer);
        if (!write_dict(state->_cbson, buffer, doc,
                        check_keys, &options, 1)) {
            goto cmditerfail;
        }

        /* We have enough data, maybe send this batch. */
        enough_data = (buffer_get_position(buffer) > max_cmd_size);
        enough_documents = (idx >= max_write_batch_size);
        if (enough_data || enough_documents) {
            buffer_t new_buffer;
            cur_size = buffer_get_position(buffer) - cur_doc_begin;

            /* This single document is too large for the command. */
            if (!idx) {
                if (op == _INSERT) {
                    _set_document_too_large(cur_size, max_bson_size);
                } else {
                    PyObject* DocumentTooLarge = _error("DocumentTooLarge");
                    if (DocumentTooLarge) {
                        /*
                         * There's nothing intelligent we can say
                         * about size for update and remove.
                         */
                        PyErr_SetString(DocumentTooLarge,
                                        "command document too large");
                        Py_DECREF(DocumentTooLarge);
                    }
                }
                goto cmditerfail;
            }

            if (!(new_buffer = buffer_new())) {
                PyErr_NoMemory();
                goto cmditerfail;
            }
            /* New buffer including the current overflow document */
            if (!buffer_write_bytes(new_buffer,
                (const char*)buffer_get_buffer(buffer), lst_len_loc + 5) ||
                !buffer_write_bytes(new_buffer, "0\x00", 2) ||
                !buffer_write_bytes(new_buffer,
                (const char*)buffer_get_buffer(buffer) + cur_doc_begin, cur_size)) {
                buffer_free(new_buffer);
                goto cmditerfail;
            }
            /*
             * Roll the existing buffer back to the beginning
             * of the last document encoded.
             */
            buffer_update_position(buffer, sub_doc_begin);

            if (!buffer_write_bytes(buffer, "\x00\x00", 2)) {
                buffer_free(new_buffer);
                goto cmditerfail;
            }

            result = _send_write_command(ctx, buffer, lst_len_loc,
                                         cmd_len_loc, &errors, to_publish);

            buffer_free(buffer);
            buffer = new_buffer;

            if (!result)
                goto cmditerfail;

#if PY_MAJOR_VERSION >= 3
            result = Py_BuildValue("NN",
                                   PyLong_FromLong(idx_offset), result);
#else
            result = Py_BuildValue("NN",
                                   PyInt_FromLong(idx_offset), result);
#endif
            if (!result)
                goto cmditerfail;

            if (PyList_Append(results, result) < 0) {
                Py_DECREF(result);
                goto cmditerfail;
            }
            Py_DECREF(result);

            if (errors && ordered) {
                destroy_codec_options(&options);
                Py_DECREF(iterator);
                buffer_free(buffer);
                return results;
            }
            idx_offset += idx;
            idx = 0;
            Py_DECREF(to_publish);
            if (!(to_publish = PyList_New(0))) {
                goto cmditerfail;
            }
        }
        if (PyList_Append(to_publish, doc) < 0) {
            goto cmditerfail;
        }
        Py_CLEAR(doc);
        idx += 1;
    }
    Py_DECREF(iterator);

    if (PyErr_Occurred()) {
        goto cmdfail;
    }

    if (empty) {
        PyObject* InvalidOperation = _error("InvalidOperation");
        if (InvalidOperation) {
            PyErr_SetString(InvalidOperation, "cannot do an empty bulk write");
            Py_DECREF(InvalidOperation);
        }
        goto cmdfail;
    }

    if (!buffer_write_bytes(buffer, "\x00\x00", 2))
        goto cmdfail;

    result = _send_write_command(ctx, buffer, lst_len_loc,
                                 cmd_len_loc, &errors, to_publish);
    if (!result)
        goto cmdfail;

#if PY_MAJOR_VERSION >= 3
    result = Py_BuildValue("NN", PyLong_FromLong(idx_offset), result);
#else
    result = Py_BuildValue("NN", PyInt_FromLong(idx_offset), result);
#endif
    if (!result)
        goto cmdfail;

    if (PyList_Append(results, result) < 0) {
        Py_DECREF(result);
        goto cmdfail;
    }
    Py_DECREF(result);
    Py_DECREF(to_publish);
    buffer_free(buffer);
    destroy_codec_options(&options);
    return results;

cmditerfail:
    Py_XDECREF(doc);
    Py_DECREF(iterator);
cmdfail:
    destroy_codec_options(&options);
    Py_DECREF(results);
    Py_XDECREF(to_publish);
    buffer_free(buffer);
    return NULL;
}

static PyMethodDef _CMessageMethods[] = {
    {"_insert_message", _cbson_insert_message, METH_VARARGS,
     "Create an insert message to be sent to MongoDB"},
    {"_update_message", _cbson_update_message, METH_VARARGS,
     "create an update message to be sent to MongoDB"},
    {"_query_message", _cbson_query_message, METH_VARARGS,
     "create a query message to be sent to MongoDB"},
    {"_get_more_message", _cbson_get_more_message, METH_VARARGS,
     "create a get more message to be sent to MongoDB"},
    {"_do_batched_insert", _cbson_do_batched_insert, METH_VARARGS,
     "insert a batch of documents, splitting the batch as needed"},
    {"_do_batched_write_command", _cbson_do_batched_write_command, METH_VARARGS,
     "execute a batch of insert, update, or delete commands"},
    {NULL, NULL, 0, NULL}
};

#if PY_MAJOR_VERSION >= 3
#define INITERROR return NULL
static int _cmessage_traverse(PyObject *m, visitproc visit, void *arg) {
    Py_VISIT(GETSTATE(m)->_cbson);
    return 0;
}

static int _cmessage_clear(PyObject *m) {
    Py_CLEAR(GETSTATE(m)->_cbson);
    return 0;
}

static struct PyModuleDef moduledef = {
        PyModuleDef_HEAD_INIT,
        "_cmessage",
        NULL,
        sizeof(struct module_state),
        _CMessageMethods,
        NULL,
        _cmessage_traverse,
        _cmessage_clear,
        NULL
};

PyMODINIT_FUNC
PyInit__cmessage(void)
#else
#define INITERROR return
PyMODINIT_FUNC
init_cmessage(void)
#endif
{
    PyObject *_cbson;
    PyObject *c_api_object;
    PyObject *m;
    struct module_state *state;

    /* Store a reference to the _cbson module since it's needed to call some
     * of its functions
     */
    _cbson = PyImport_ImportModule("bson._cbson");
    if (_cbson == NULL) {
        INITERROR;
    }

    /* Import C API of _cbson
     * The header file accesses _cbson_API to call the functions
     */
    c_api_object = PyObject_GetAttrString(_cbson, "_C_API");
    if (c_api_object == NULL) {
        Py_DECREF(_cbson);
        INITERROR;
    }
#if PY_VERSION_HEX >= 0x03010000
    _cbson_API = (void **)PyCapsule_GetPointer(c_api_object, "_cbson._C_API");
#else
    _cbson_API = (void **)PyCObject_AsVoidPtr(c_api_object);
#endif
    if (_cbson_API == NULL) {
        Py_DECREF(c_api_object);
        Py_DECREF(_cbson);
        INITERROR;
    }

#if PY_MAJOR_VERSION >= 3
    m = PyModule_Create(&moduledef);
#else
    m = Py_InitModule("_cmessage", _CMessageMethods);
#endif
    if (m == NULL) {
        Py_DECREF(c_api_object);
        Py_DECREF(_cbson);
        INITERROR;
    }

    state = GETSTATE(m);
    state->_cbson = _cbson;

    Py_DECREF(c_api_object);

#if PY_MAJOR_VERSION >= 3
    return m;
#endif
}
